package com.denghq.projectbuilder.component.msgbus.autoconfigure.rabbitmq;

import com.denghq.projectbuilder.component.amqp.Configuration.RabbitConfigurationProperties;
import com.denghq.projectbuilder.component.msgbus.autoconfigure.rabbitmq.property.ExchangeBindProperty;
import com.denghq.projectbuilder.component.msgbus.receiver.impl.BindRoutingKey;
import com.denghq.projectbuilder.component.msgbus.receiver.impl.RabbitMqMsgReceiver;
import com.denghq.projectbuilder.component.msgbus.sender.impl.RabbitMqMsgSender;
import com.denghq.projectbuilder.component.msgbus.autoconfigure.rabbitmq.property.MsgBusProperties;
import com.denghq.projectbuilder.component.msgbus.bus.IMsgProcessor;
import com.denghq.projectbuilder.component.msgbus.bus.impl.RabbitMqMsgBus;
import com.denghq.projectbuilder.component.msgbus.sender.IMsgSendEventHandler;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
 * 消息总线自动配置类
 */
@Configuration
@EnableConfigurationProperties({MsgBusProperties.class, RabbitProperties.class})
@ConditionalOnProperty(prefix = "com.denghq.component.msgbus.rabbitmq", name = "enable", havingValue = "true", matchIfMissing = true)
public class MsgBusAutoConfiguration {

    private final MsgBusProperties properties;

    @Autowired(required = false)
    private List<IMsgProcessor> msgProcessors;

    private final RabbitConfigurationProperties rabbitProperties;

    @Autowired(required = false)
    IMsgSendEventHandler msgSendEventHandler;

    public MsgBusAutoConfiguration(MsgBusProperties properties, RabbitConfigurationProperties rabbitProperties) {
        this.rabbitProperties = rabbitProperties;
        this.properties = properties;
    }

    /*@Bean
    @ConditionalOnMissingBean
    public IMsgSendEventHandler msgSendEventHandler() {

        return new SimpleMsgSendEventHandler() {
            @Override
            public void onSuccessEvent(IMsgSendEvent event) {
                super.onSuccessEvent(event);
            }
        };
    }*/

    @Bean
    @ConditionalOnMissingBean
    public CachingConnectionFactory connectionFactory(RabbitProperties rabbitProperties) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitProperties.getHost(),
                rabbitProperties.getPort());
        connectionFactory.setUsername(rabbitProperties.getUsername());
        connectionFactory.setPassword(rabbitProperties.getPassword());
        return connectionFactory;
    }

    @Bean
    @ConditionalOnMissingBean
    public RabbitMqMsgBus rabbitMqMsgBus(RabbitMqMsgSender sender, RabbitMqMsgReceiver receiver) {

        return new RabbitMqMsgBus(sender, receiver);

    }

    @Bean
    @ConditionalOnMissingBean
    public MsgBusSubscribeBuilder rabbitMqMsgBusSubscribeBuilder(RabbitMqMsgBus msgBus) {
        return new MsgBusSubscribeBuilder(msgBus, msgProcessors);
    }

    @Bean
    @ConditionalOnMissingBean
    public RabbitMqMsgSender rabbitMqMsgSender(CachingConnectionFactory connectionFactory) {
        RabbitMqMsgSender mqMsgSender = new RabbitMqMsgSender(connectionFactory);
        mqMsgSender.setSendEventHandler(msgSendEventHandler);

        Optional.ofNullable(properties.getSender()).ifPresent(sender -> {

            if (StringUtils.isNotBlank(sender.getDefaultExchange())) {
                mqMsgSender.setDefaultExchange(sender.getDefaultExchange());
            } else {
                mqMsgSender.setDefaultExchange(rabbitProperties.getExchangeName());
            }
            if (!CollectionUtils.isEmpty(sender.getTopics())) {
                Map<String, String> topicExchangeMap = Maps.newHashMap();
                sender.getTopics().forEach(topic ->
                        topicExchangeMap.put(topic.getName(), topic.getExchange())
                );
                mqMsgSender.setTopicExchangeMap(topicExchangeMap);
            }

        });
        if (properties.getSender() == null) {
            mqMsgSender.setDefaultExchange(rabbitProperties.getExchangeName());
        }
        return mqMsgSender;
    }

    @Bean
    @ConditionalOnMissingBean
    public RabbitMqMsgReceiver rabbitMqMsgReceiver(CachingConnectionFactory connectionFactory) {

        RabbitMqMsgReceiver.Builder builder = new RabbitMqMsgReceiver.Builder(connectionFactory);

        Optional.ofNullable(properties.getQueueList()).ifPresent(queues -> {
                    List<Queue> declareQueueList = Lists.newArrayList();
                    queues.forEach(q ->
                            declareQueueList.add(new Queue(q.getName(), q.isDurable(), q.isExclusive(), q.isAutoDelete(), q.getArguments()))
                    );
                    builder.declareQueueList(declareQueueList);
                }
        );
        Optional.ofNullable(properties.getMsgReceiver()).ifPresent(receiverProperty -> {

            Optional.ofNullable(receiverProperty.getMaxMsgProcessThreadNum()).ifPresent(o ->
                    builder.maxMsgProcessThreadNum(o)
            );

            Optional.ofNullable(properties.getExchangeBinder()).ifPresent(o -> {
                if (!CollectionUtils.isEmpty(o)) {
                    List<Exchange> declareExchangeList = Lists.newArrayList();
                    List<BindRoutingKey> bindRoutingKeyList = Lists.newArrayList();
                    Set<String> consumeQueueSet = Sets.newHashSet();
                    o.forEach(bind -> {
                        if (StringUtils.isBlank(bind.getName())) {
                            bind.setName(rabbitProperties.getExchangeName());
                        }
                        declareExchangeList.add(getDeclareExchange(bind));
                        if (!CollectionUtils.isEmpty(bind.getBindings())) {
                            bind.getBindings().forEach(bindingKey -> {
                                bindRoutingKeyList.add(new BindRoutingKey(bindingKey.getQueueName(), bind.getName(), bindingKey.getRoutingKey()));
                                consumeQueueSet.add(bindingKey.getQueueName());
                            });
                            builder.consumeQueueList(consumeQueueSet.toArray(new String[0]));
                        }
                    });
                    builder.bindRoutingKeyList(bindRoutingKeyList);
                    builder.declareExchangeList(declareExchangeList);
                }

            });

            Optional.ofNullable(receiverProperty.getMaxMsgProcessThreadNum()).ifPresent(o ->
                    builder.maxMsgProcessThreadNum(o)
            );

            Optional.ofNullable(receiverProperty.getMaxConcurrentConsumers()).ifPresent(o ->
                    builder.maxConcurrentConsumers(o)
            );

            Optional.ofNullable(receiverProperty.getConcurrentConsumers()).ifPresent(o ->
                    builder.concurrentConsumers(o)
            );
        });
        return builder.build();
    }

    private Exchange getDeclareExchange(ExchangeBindProperty bind) {
        if (ExchangeTypes.DIRECT.equals(bind.getType())) {
            return new DirectExchange(bind.getName());
        } else if (ExchangeTypes.FANOUT.equals(bind.getType())) {
            return new FanoutExchange(bind.getName());
        } else if (ExchangeTypes.TOPIC.equals(bind.getType())) {
            return new TopicExchange(bind.getName());
        } else if (ExchangeTypes.HEADERS.equals(bind.getType())) {
            return new HeadersExchange(bind.getName());
        }
        //默认返回topic类型
        return new TopicExchange(bind.getName());
    }
}
